gRPC 学习之-中级知识

December 25, 2021

中级知识

借助 gRPC 可以实现不同的进程间通信模式(也称RPC风格);本章将讨论 gRPC 应用的四种通信模式,一元RPC、服务端流RPC、客户端流RPC、以及双向RPC。

一元RPC

一元RPC:一元RPC模式也被称为简单RPC模式,当客户端调用服务端的远程方法时,客户端发送请求到服务端并获得一个响应,与响应一起发送还有元数据。

服务器端流 RPPC 模式

一元RPC模式:gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。 服务器端流 RPC 模式:gRPC 服务器端在接收到客户端的请求消息后,会发回一个响应序列(简单说就是会发回多个响应)。这种多个响应所组成的序列也被称为流。

例如:

在 OrderManagement 服务中,假设需要实现一个订单搜索功能;利用服务器端流 gRPC 模式时 OrderManagement 服务不会将所有匹配的订单一次性的发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。

grpc-server-stream

  • grpc server
func (s *server) SearchOrders(searchQuery *wrappers.StringValue,stream pb.OrderManagement_SearchOrdersServer) error {
	for key, order := range orderMap {
		log.Print(key, order)
		for _, itemStr := range order.Items {
			log.Print(itemStr)
			if strings.Contains(itemStr, searchQuery.Value) {
				// Send the matching orders in a stream
				err := stream.Send(&order)
				if err != nil {
					return fmt.Errorf("error sending message to stream : %v", err)
				}
				log.Print("Matching Order Found : " + key)
				break
			}
		}
	}
	return nil
}
  • grpc clinet
func main() {
    ....
    searchStream, 	_ := client.SearchOrders(ctx, &wrappers.StringValue{Value: "Google"})
	for {
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}

		if err == nil {
			log.Print("Search Result : ", searchOrder)
		}
	}
    ...
}

上述就是 gRPC 服务器端流模式,服务在检索到订单时,通过以流的形式发送给客户端 err := stream.Send(&order) ; 客户端在调用时,返回一个客户端流,它有一个 Recv 方法,调用客户端的 Recv 方法,可以逐个读取服务端写入的数据。当发现流结束时, Recv 会返回 io.EOF

客户端流 RPC 模式

客户端流RPC模式:客户端会发起多个请求给服务端,而不再是单个请求,服务器端则会发送一个响应给客户端。但是,服务端不一定要等到从客户端接受到所有消息后才发送响应。

例如:

在 OrderManagement 服务中希望添加 updateOrders 方法,从而更新一个订单集合。

grpc client stream

  • grpc server : 如下方法定义了一个服务端接收客户端流的方法
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
	ordersStr := "Updated Order IDs : "
	for {
		order, err := stream.Recv()
		if err == io.EOF {
			// 客户端发送完流数据
			return stream.SendAndClose(&wrappers.StringValue{Value: "Orders processed " + ordersStr})
		}

		if err != nil {
			return err
		}
		// Update order
		orderMap[order.Id] = *order

		log.Printf("Order ID : %s - %s", order.Id, "Updated")
		ordersStr += order.Id + ", "
	}
}
  • grpc client
func main(){
    ....省略
    // =========================================
	// Update Orders : Client streaming scenario
	updOrder1 := pb.Order{Id: "102", Items:[]string{"Google Pixel 3A", "Google Pixel Book"}, Destination:"Mountain View, CA", Price:1100.00}
	updOrder2 := pb.Order{Id: "103", Items:[]string{"Apple Watch S4", "Mac Book Pro", "iPad Pro"}, Destination:"San Jose, CA", Price:2800.00}
	updOrder3 := pb.Order{Id: "104", Items:[]string{"Google Home Mini", "Google Nest Hub", "iPad Mini"}, Destination:"Mountain View, CA", Price:2200.00}

	updateStream, err := client.UpdateOrders(ctx)

	if err != nil {
		log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
	}

	// Updating order 1
	if err := updateStream.Send(&updOrder1); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
	}

	// Updating order 2
	if err := updateStream.Send(&updOrder2); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
	}

	// Updating order 3
	if err := updateStream.Send(&updOrder3); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
	}

	updateRes, err := updateStream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
	}
	log.Printf("Update Orders Res : %s", updateRes)

}

客户端以流的形式发送updateStream.Send(&updOrder2)给服务端,一旦所有消息都以流的形式发送出去,客户端就可以将流标记为已完成,并通过 CloseAndRecv 方法来读取服务端的响应。

双向流 RPC 模式

双向流RPC模式:客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。

例如:

在 OrderManagement 服务中,假设需要一个订单处理功能,通过该功能,用户可以发送连续的订单集合,并根据投递地址对它们进行组合发货。

grpc-double-stream

这个业务用例的关键步骤如下所示:

  • 每个订单以独立的 gRPC 消息的形式发送至服务器端。
  • 每个发货组合可能会包含多个订单,它们应该被投递到相同的目的地。
  • 订单是成批处理的,当达到指定的批次大小时,当前创建的所有发货组合都会被发送至客户端。
  • 假设:流中有4个订单,其中有两个订单要发送至X,两个要发送至Y,则可以将其表示为:X、Y、X、Y。如果批次大小为3,那么所创建的订单发货组合会是[X,X],[Y],[Y]。

gRpc Server:主要逻辑就是通过客户端发送过来的订单ID按照指定批次大小以及发货地址进行分组,再把分组后的信息返回给客户端。

const orderBatchSize = 3
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
	batchMarker := 1
	var combinedShipmentMap = make(map[string]pb.CombinedShipment)
	for {
		orderId, err := stream.Recv()
		log.Printf("Reading Proc order : %s", orderId)
		if err == io.EOF {
			// 客户端发送关闭时,需要把服务端已经接收到的进行发送
			log.Printf("EOF : %s", orderId)
			for _, shipment := range combinedShipmentMap {
				if err := stream.Send(&shipment); err != nil {
					return err
				}
			}
			return nil
		}
		if err != nil {
			log.Println(err)
			return err
		}

		destination := orderMap[orderId.GetValue()].Destination
		shipment, found := combinedShipmentMap[destination]

		if found {
			ord := orderMap[orderId.GetValue()]
			shipment.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = shipment
		} else {
			comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!", }
			ord := orderMap[orderId.GetValue()]
			comShip.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = comShip
			log.Print(len(comShip.OrdersList), comShip.GetId())
		}

		if batchMarker == orderBatchSize {
			for _, comb := range combinedShipmentMap {
				log.Printf("Shipping : %v -> %v" , comb.Id, len(comb.OrdersList))
				if err := stream.Send(&comb); err != nil {
					return err
				}
			}
			batchMarker = 0
			combinedShipmentMap = make(map[string]pb.CombinedShipment)
		} else {
			batchMarker++
		}
	}
}

gRpc Client:主要逻辑就是发送订单ID给服务端,之后开启 Go协程来读取服务端返回的消息。

func main(){
    ...
    // =========================================
	// Process Order : Bi-di streaming scenario
	streamProcOrder, err := client.ProcessOrders(ctx)
	if err != nil {
		log.Fatalf("%v.ProcessOrders(_) = _, %v", client, err)
	}

	if err := streamProcOrder.Send(&wrappers.StringValue{Value:"102"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "102", err)
	}

	if err := streamProcOrder.Send(&wrappers.StringValue{Value:"103"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "103", err)
	}

	if err := streamProcOrder.Send(&wrappers.StringValue{Value:"104"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "104", err)
	}

	if err := streamProcOrder.Send(&wrappers.StringValue{Value:"101"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "101", err)
	}

    channel := make(chan struct{})
	go asncClientBidirectionalRPC(streamProcOrder, channel)
	
	if err := streamProcOrder.CloseSend(); err != nil {
		log.Fatal(err)
	}
	channel <- struct{}{}

}

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
	for {
		combinedShipment, errProcOrder := streamProcOrder.Recv()
		if errProcOrder == io.EOF { // 判断服务端流是否结束;服务端方法只要有return就意味着流结束流了,可能是因为正常结束,也可能是错误导致结束,总之只要return之后双向流就断开了。
			break
		}
		log.Printf("Combined shipment : ", combinedShipment.OrdersList)
	}
	<-c
}

在双向流中可以通过定义客户端流和服务端流的方式来实现,客户端和服务端都定义了 SendRecv 方法。可以实现双向传输。

// 客户端流
type OrderManagement_ProcessOrdersClient interface {
	Send(*wrappers.StringValue) error
	Recv() (*CombinedShipment, error)
	grpc.ClientStream
}

// 服务端流
type OrderManagement_ProcessOrdersServer interface {
	Send(*CombinedShipment) error
	Recv() (*wrappers.StringValue, error)
	grpc.ServerStream
}

客户端可以并发读取和写入同一个流,输入流和输出流可以独立进行操作。


LRF 记录学习、生活的点滴